{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'\\n第一种：读取一个外部数据集。\\n比如，从本地文件加载数据集，或者从HDFS文件系统、HBase、Cassandra、Amazon S3等外部数据源中加载数据集。\\nSpark可以支持文本文件、SequenceFile文件（Hadoop提供的 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件）\\n和其他符合Hadoop InputFormat格式的文件。\\n'"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# RDD创建\n",
    "\"\"\"\n",
    "第一种：读取一个外部数据集。\n",
    "比如，从本地文件加载数据集，或者从HDFS文件系统、HBase、Cassandra、Amazon S3等外部数据源中加载数据集。\n",
    "Spark可以支持文本文件、SequenceFile文件（Hadoop提供的 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件）\n",
    "和其他符合Hadoop InputFormat格式的文件。\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'\\n启动时可能会出现如下 WARN 提示：WARN util.NativeCodeLoader: \\nUnable to load native-hadoop library for your platform… using builtin-java classes where applicable WARN 提示\\n可以忽略，并不会影响正常使用。\\n'"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\"\"\"hadoop hdfs 伪分布式\n",
    "http://dblab.xmu.edu.cn/blog/install-hadoop/\n",
    "Hadoop配置文件说明\n",
    "Hadoop 的运行方式是由配置文件决定的（运行 Hadoop 时会读取配置文件），\n",
    "因此如果需要从伪分布式模式切换回非分布式模式，需要删除 core-site.xml 中的配置项。\n",
    "\n",
    "此外，伪分布式虽然只需要配置 fs.defaultFS 和 dfs.replication 就可以运行（官方教程如此），\n",
    "不过若没有配置 hadoop.tmp.dir 参数，则默认使用的临时目录为 /tmp/hadoo-hadoop，\n",
    "而这个目录在重启时有可能被系统清理掉，导致必须重新执行 format 才行。所以我们进行了设置，\n",
    "同时也指定 dfs.namenode.name.dir 和 dfs.datanode.data.dir，否则在接下来的步骤中可能会出错。\n",
    "\"\"\"\n",
    "# 配置完成后，执行 NameNode 的格式化:\n",
    "# $ ./bin/hdfs namenode -format\n",
    "\n",
    "# 接着开启 NameNode 和 DataNode 守护进程。\n",
    "# $ ./sbin/start-dfs.sh\n",
    "\n",
    "\"\"\"\n",
    "启动时可能会出现如下 WARN 提示：WARN util.NativeCodeLoader: \n",
    "Unable to load native-hadoop library for your platform… using builtin-java classes where applicable WARN 提示\n",
    "可以忽略，并不会影响正常使用。\n",
    "\"\"\"\n",
    "\n",
    "# 启动完成后，可以通过命令 `jps` 来判断是否成功启动，"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "sc = SparkContext()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "lines = sc.textFile(\"hdfs://localhost:9000/user/shilinlee/word.txt\")\n",
    "# >>> lines = sc.textFile(\"/user/shilinlee/word.txt\")\n",
    "# >>> lines = sc.textFile(\"word.txt\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'Spark': 1, 'world': 1, 'good': 1, 'hello': 3, 'Scala': 1, 'morning': 1}"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "lines.flatMap(lambda x: x.split()).map(lambda line: (line, 1)).reduceByKey(lambda a, b: a+b).collectAsMap()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'\\n第二种：调用SparkContext的parallelize方法，在Driver中一个已经存在的集合（数组）上创建。\\n'"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\"\"\"\n",
    "第二种：调用SparkContext的parallelize方法，在Driver中一个已经存在的集合（数组）上创建。\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "nums = [1,2,3,4,5]\n",
    "rdd = sc.parallelize(nums)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ParallelCollectionRDD[7] at parallelize at PythonRDD.scala:194"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 会调用persist(MEMORY_ONLY)，但是，语句执行到这里，并不会缓存rdd，这时rdd还没有被计算生成, \n",
    "# 而是要等到遇到第一个行动操作触发真正计算以后，才会把计算结果进行持久化\n",
    "rdd.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1, 2, 3, 4, 5]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.collect() # 从缓存中读取"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ParallelCollectionRDD[7] at parallelize at PythonRDD.scala:194"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.unpersist()  # 主动清除缓存"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1, 2, 3, 4, 5]"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.collect() # 从头计算"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
